package com.kaigejava.rocketmq.maindemo.product.shunxu;

import com.kaigejava.rocketmq.maindemo.product.shunxu.dto.OrderStep;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * @author 凯哥Java
 * @description 顺序消息
 * 如果消息依次发送到同一个队列中，消费也是从队列中依次消费。
 * 如果只有一个队列，则全局有序
 * 如果是多个queue，则分区有序的
 * 需求：模拟订单
 * 一个订单的顺序流程是：创建订单、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中；消费的是，同一个OrderId获取到的是同一个队列。
 * @company
 * @since 2022/10/19 9:42
 */
public class ShunXuProducer {


    /**
     * 三个订单：
     * 1039：创建订单、付款、推送、完成
     * 1065：创建订单、付款
     * 7235：创建订单、付款
     * @param args
     */
    public static void main(String[] args) throws Exception {
        //1：创建消息生产者producer,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2：制定nameserver地址
        producer.setNamesrvAddr("192.168.50.132:9876");
        //设置发送超时时间：
        producer.setSendMsgTimeout(10000);
        producer.start();
        List<OrderStep> orderSteps = ShunXuProducer.buildOrders();
        for (int i = 0; i < orderSteps.size(); i++) {
            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateStr = sdf.format(date);
            OrderStep orderStep = orderSteps.get(i);
            String msgBody = dateStr+":"+orderStep;
            //这里选择topic,tags,keys,body这种构成器的
            Message message = new Message("shunxu-topic","orderTag","i"+i,msgBody.getBytes());
            //发送消息。因为我们这里需要根据订单id进行发送的。说明需要有个选择器。所以，我们选择发送多参数的方法
            /**
             * Message :消息对象
             * MessageQueueSelector :选择器
             * Object ：选择队列的业务标识(根据这个标识对不同的消息，选择发送给不同的队列。这里使用的是订单id)
             */
            SendResult result = producer.send(message, new MessageQueueSelector() {
                /**
                 * 参数说明
                 * @param queueList     当前topic下所有的队列集合
                 * @param message       消息对象
                 * @param o             选择队列的业务标识(根据这个标识对不同的消息，选择发送给不同的队列。这里使用的是订单id)
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> queueList, Message message, Object o) {
                    Long orderId = (Long) o;
                    //订单id和队列长度进行取模。
                    int index = (int) (orderId % queueList.size());
                    return queueList.get( index);
                }
            },orderStep.getOrderId());
            SendStatus status = result.getSendStatus();
            String msgId = result.getMsgId();
            int queueId = result.getMessageQueue().getQueueId();
            String offsetMegId = result.getOffsetMsgId();
            long offset = result.getQueueOffset();
            String str = new String(message.getBody());
            String sendResultMsg = "线程名称:【"+Thread.currentThread().getName()+"】\t 消费者队列id:"+queueId +"\t 消息发送内容:"+str;
            System.out.println(sendResultMsg);
        }

        producer.shutdown();
    }


    private static List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}
